/**
 * @Author: liaoPengLiang
 * @File:  receiver
 * @Version: 1.0.0
 * @Date: 2023/3/24
 * @Description:
 */

package tunnelx

import (
	"fmt"
	"github.com/alibaba/MongoShake/v2/tunnel"
	"net"
	"net/rpc"
)

type RPCReader struct {
	server  *rpc.Server
	address string
}

func (r *RPCReader) Link(replayers []*Replayer) (err error) {
	var listener net.Listener
	if listener, err = net.Listen("tcp", r.address); err != nil {
		return
	}

	r.server = rpc.NewServer()
	r.server.Register(NewTunnelRPC(replayers))

	go r.server.Accept(listener)

	fmt.Printf("启动 mxtio-mongoshake 监听rpc地址 %s", r.address)

	return nil
}

type TunnelRPC struct {
	replayers []*Replayer
}

func NewTunnelRPC(replayers []*Replayer) *TunnelRPC {
	return &TunnelRPC{
		replayers: replayers,
	}
}
func (rpc *TunnelRPC) Transfer(message *tunnel.TMessage, response *int64) error {
	// hash corresponding replayer and re-shard
	if message.Shard >= uint32(len(rpc.replayers)) {
		message.Shard %= uint32(len(rpc.replayers))
	}
	*response = rpc.replayers[message.Shard].Sync(message, nil)

	return nil
}
